LakeSoul CDC 表格式
CDC (Change Data Capture) 是湖仓重要的数据源之一. LakeSoul CDC 表的目标是能够从在线 OLTP 数据库快速同步数据到 LakeSoul 表中,从而下游分析计算任务在较小的时间间隔后就可以读到在线数据库的同步数据,消除了传统 T+1 复制的开销。相比普通支持 Upsert 的表,CDC 表额外支持了删除行的操作。
LakeSoul CDC 表使用一个额外的操作列(列名可以自定义)来记录 CDC 的操作类型,可以支持从 Debezium, canal 和 Flink CDC 中导入 CDC 数据。LakeSoul 默认建表并不会启用 CDC 表格式,默认表仅支持 Upsert 操作。要开启对 CDC 的支持,需要在建表时增加额外的属性。
创建 LakeSoul CDC 表,需要添加一个表属性 lakesoul_cdc_change_column
来配置 CDC 变更类型的列名。这一列需要是 string
类型,包含三种取值之一: update
, insert
, delete
.
在 LakeSoul 批量读数据的自动合并阶段(包括使用 Spark/Flink 批式读取),会保留最新的 insert
、update
数据,并自动过滤掉 delete
的行。而使用 Spark/Flink 流式增量读取时,会保留 CDC 变更列的值(也即包含了 insert
, update
, delete
),在 Flink 中,这一列会被自动转换为 Flink RowData 对象的 RowKind 字段的对应值,从而在 Flink 流式读取时完整支持了 Flink Changelog Stream 语义,能够进行增量计算。
创建 LakeSoul CDC 表
使用 Spark
使用 Spark Scala API 或者 SQL,假设操作类型列名为 change_type
:
- Scala
- SQL
import com.dmetasoul.lakesoul.tables.LakeSoulTable
LakeSoulTable.createTable(data, path).shortTableName("cdc_ingestion").hashPartitions("id").hashBucketNum(2).rangePartitions("rangeid").tableProperty("lakesoul_cdc_change_column" -> "change_type").create()
CREATE TABLE table_name (id string, date string, change_type string) USING lakesoul
PARTITIONED BY (date)
LOCATION 's3://lakesoul-bucket/table_path'
TBLPROPERTIES('lakesoul_cdc_change_column'='change_type',
'hashPartitions'='id',
'hashBucketNum'='2');
注意 LakeSoul CDC 表必须是主键表,并且主键列需要和 CDC 上游数据库表相同。
使用 Flink
请参考 Flink Connector
LakeSoul CDC 表的增量读取
LakeSoul 的增量采用的是主键分片的模式,因此增量数据写入时不需要与存量数据做合并操作。对于 CDC 表,增量数据就是原始的 CDC 流的内容。对 LakeSoul 表的 CDC 增量读取,可以完整保留 CDC 操作标记,即 insert、update、delete。2.2.0 版本起在 Spark 中已经支持了增量流式读取。
2.3.0 版本起,支持了 Flink Table Source,支持 Flink ChangeLog Stream 的流式增量读写,请参考 Flink Connector 。